服务端代码
1 | type TigerServer struct {} |
从以上代码可以看出,开启一个gRPC服务的代码非常简洁
- 实例化一个grpc.Server
- 将实现了pb文件中service接口的服务对象注册进grpc.Server
- 监听端口
- 接收连接请求
实例化server
首先我们来看实例化一个server做了些什么操作
1 | // NewServer creates a gRPC server which has no service registered and has not |
可以看出,这个函数与客户端Dial的处理流程很相似,都是接收一个可变参数用于覆盖默认配置,然后是初始化grpc.Server结构体,拦截器相关处理及一些grpc调用监控相关的初始化处理。
这里最关键的就是初始化grpc.Server结构体了
1 | type Server struct { |
服务端可选配置项
1 | type serverOptions struct { |
将注册服务对象注册进server
将服务对象注册进server的函数是pb文件中用protoc代码生成插件protoc-gen-go工具自动生成的。
pb文件中服务端相关代码
1 | // 服务描述 |
以下代码可以看出,实际上还是调用grpc包中的注册方法
1 | // RegisterService registers a service and its implementation to the gRPC |
可以看出,register主要是将服务名和具体服务信息添加进Server的属性映射m,也就是服务名与具体服务信息的映射,相当于是http web中的路由表。
server开始监听并接收请求
1 | // Serve accepts incoming connections on the listener lis, creating a new |
从代码可以看出,每个请求进来,Server会起一个goroutine去处理这个请求,真正处理客户端请求的是 handleRawConn 方法。
1 | // handleRawConn forks a goroutine to handle a just-accepted connection that |
1 | // 每个http2连接在服务端会生成一个ServerTransport,这里是 htt2server |
1 | // NewServerTransport creates a ServerTransport with conn or non-nil error |
1 | // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is |
一个HTTP2请求进来,先HTT2协议握手建立HTT2连接(处理建立连接过程中的帧数据),然后一直循环处理新的流的建立(新的HTTP2请求到来)和数据帧的收发(基于HTTP2的多路复用,客户端可以使用同一条链接同时发送多个请求)。HTTP2链接在代码层面为ServerTransport,也就是http2Server
接下来我们看下serveStreams方法
1 | func (s *Server) serveStreams(st transport.ServerTransport) { |
可以看出,这里是使用waitGroup进行阻塞等待,不断处理同一个http2连接的请求。
HandleStreams使用注册的Handler处理请求streams
1 | HandleStreams(func(*Stream), func(context.Context, string) context.Context) |
这是一个接口方法,这里的具体实现是http2Server的HandleStreams方法,传入一个处理stream的handler处理函数,一个trace处理函数
1 | func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { |
可以看出该方法循环读取客户端连接发送过来的帧,如果是HEADER帧,说明有新的rpc请求进来,回调handler;如果是DATA帧,将数据分发到stream;如果是RST帧…
接着我们来看下server的handleStream方法,该方法处理新的rpc请求
1 | func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { |
接下来我们看下普通一元rpc请求的processUnaryRPC(去除掉一些与主要流程没有太大关系的非核心代码,流式rpc处理方法就不贴出来的,思路是差不多的)
1 | func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { |
上面这段代码的主要逻辑就是从stream中读取req请求,反序列化后调用methodDesc中的handler方法,处理完请求后将响应序列化然后写入stream返回给客户端。
我们看一下消息解析的方法,这个方法是从buf中解析原始的数据转为protobuf序列化后的数据格式
1 | func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) { |
我们再来看下上面提到的proto代码工具自动生成的handler方法(以HelloTiger为例)
1 | func _TigerService_HelloTiger_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
我们再来看下中的NewContextWithServerTransportStream函数。
1 | //该函数基于现有ctx生成新的context,并将stream保存到上面 |
我们从代码中可以看出,调用NewContextWithServerTransportStream函数时传入的是当前请求的stream.context()
我们再来看下 stream.ctx的生成
1 | // operateHeader takes action on the decoded headers. |
当收到一个Header帧,就表明有新的rpc请求到来,这时候就会解析header帧并创建stream,在创建stream的时候,会把用户自定义的header字段保存到stream.context中
接下来看一下返回给客户端响应的处理
1 | func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { |
1 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error |